package com.bjy.qa.agent.transport.websocket;

import com.alibaba.fastjson.JSONObject;
import com.bjy.qa.agent.context.Context;
import com.bjy.qa.agent.enumtype.RunStatus;
import com.bjy.qa.agent.exception.MyException;
import com.bjy.qa.agent.model.KeyValueStore;
import com.bjy.qa.agent.tester.LogUtil;
import com.bjy.qa.agent.tools.json.JsonHelper;
import okhttp3.Response;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * BRTC WebSocket 服务接口
 */
public class BRTCWebSocketService implements IWebSocketService {
    private static final Logger logger = LoggerFactory.getLogger(BRTCWebSocketService.class);

    private Context context; // 上下文, 保存变量、参数、环境
    LogUtil logUtil; // 设置上报运行结果工具类
    private WebSocketHelper webSocketHelper;

    private boolean opened = false; // WebSocket 链接是否已经打开

    private JSONObject options; // 扩展数据（通道、服务器是否要关闭连接）
    private String channel; // 拼接好的上报通道字符串
    private Boolean close; // 服务器是否主动关闭 WebSocket 连接

    private List<SendThread> sendThreadList = new ArrayList<SendThread>(); // 发送线程列表

    private AtomicLong autoId = new AtomicLong(0); // 自增 ID
    private Map<Long, ResponseMessages> pendingResponseMessages = new ConcurrentHashMap<>(); // 待处理的，请求后服务器响应的消息列表
    private List<PushMessages> pendingPushMessages = new ArrayList<>(); // 待处理的，服务器主动推送过来的消息列表
    private StringBuffer errorMessages = new StringBuffer(); // 错误消息列表

    /**
     * 构造函数
     * @param endingType 结束符类型
     * @param url url
     * @param urlParams url 参数
     * @throws UnsupportedEncodingException
     * @throws InterruptedException
     */
    public BRTCWebSocketService(Context context, LogUtil logUtil, EndingType endingType, String url, List<KeyValueStore> urlParams) throws UnsupportedEncodingException, InterruptedException {
        this.context = context;
        this.logUtil = logUtil;
        if (webSocketHelper == null) {
            openWebSocket(endingType, url, urlParams);
        }
    }

    @Override
    public void send(String sendText, long interval, String assertStr, String setStr, boolean lose, JSONObject options) {
        this.options = options;
        this.channel = "通道" + options.getString("channel") + ": ";
        this.close = options.getBoolean("close");

        ResponseMessages tmp = new ResponseMessages();
        tmp.setLose(lose);
        if (assertStr != null) { // 如果需要断言，就添加到待处理的响应消息
            tmp.setAssertStr(assertStr);
        }
        if ((setStr != null) && (JSONObject.parseObject(setStr).size() > 0)) { // 如果需要保存变量，就添加到待处理的响应消息
            tmp.setSetStr(setStr);
        }

        if (interval == 0) { // 发送间隔为 0，直接发送一次
            sending(sendText, lose, tmp);
        } else { // 发送间隔不为 0，启动发送线程，定时重复发送
            SendThread sendThread = new SendThread(interval, sendText, lose, tmp);
            sendThreadList.add(sendThread);
            sendThread.start();
        }
    }

    @Override
    public void receive(String sendText, long interval, String assertStr, String setStr, boolean lose, JSONObject options) {
        this.options = options;
        this.channel = "通道" + options.getString("channel") + ": ";
        this.close = options.getBoolean("close");

        PushMessages tmp = new PushMessages();
        tmp.setLose(lose);
        if (assertStr != null) { // 如果需要断言，就添加到待处理的服务器主动推送过来的消息
            tmp.setAssertStr(assertStr);
        }
        if ((setStr != null) && (JSONObject.parseObject(setStr).size() > 0)) { // 如果需要保存变量，就添加到待处理的服务器主动推送过来的消息
            tmp.setSetStr(setStr);
        }
        if ((sendText != null) && (JSONObject.parseObject(sendText).size() > 0)) { // 如果收到 push 后需要回消息，就添加到待处理的服务器主动推送过来的消息
            tmp.setSendText(sendText);
        }

        pendingPushMessages.add((PushMessages) tmp.clone());
    }

    /**
     * 关闭连接
     */
    @Override
    public void close(String msg) {
        // 循环关闭所有发送线程
        for (SendThread sendThread : sendThreadList) {
            sendThread.interrupt();
        }
        webSocketHelper.close(msg); // 关闭 websocket 连接
        webSocketHelper = null;
    }

    @Override
    public boolean hasPending() {
        if (pendingResponseMessages.size() != 0) { // 待处理的，请求后服务器响应的消息列表
            return true;
        }
        if (pendingPushMessages.size() != 0) { // 待处理的，服务器主动推送过来的消息列表
            return true;
        }
        return false;
    }

    @Override
    public String getPendingMessages() {
        StringBuffer sb = new StringBuffer();
        if (pendingResponseMessages.size() != 0) { // 待处理的，请求后服务器响应的消息列表
            sb.append("存在待处理的请求后服务器响应的消息: ");
            sb.append(pendingResponseMessages.toString());
            sb.append("\n");
        }
        if (pendingPushMessages.size() != 0) { // 待处理的，服务器主动推送过来的消息列表
            sb.append("存在待处理的服务器主动推送过来的消息: ");
            sb.append(pendingPushMessages.toString());
        }
        return sb.toString();
    }

    /**
     * 打开 websocket 连接
     * @param endingType 结束符类型
     * @param url url
     * @param urlParams url 参数
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    private void openWebSocket(EndingType endingType, String url, List<KeyValueStore> urlParams) throws InterruptedException, UnsupportedEncodingException {
        webSocketHelper = new WebSocketHelper().getInstance(endingType, url, urlParams, new WebSocketHelper.WebSocketListener() {
            @Override
            public void onOpen(WebSocketHelper webSocketHelper, Response response) {
                opened = true;
            }

            @Override
            public void onMessage(WebSocketHelper webSocketHelper, String text) {
                // 从收到数据中读取 id、seq
                JsonHelper jsonHelper = new JsonHelper(text);
                Long id = jsonHelper.getLong("$.id");
                Long seq = jsonHelper.getLong("$.seq");

                if (id == null && seq == null) { // 非 BRTC 信令格式消息
                    logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket", channel + "收到非 BRTC 信令格式消息：" + text);
                    logUtil.sendStatusLog(RunStatus.FAILED);
                } else if (seq == null) { // 请求后服务器响应的消息列表
                    ResponseMessages tmp = pendingResponseMessages.get(id); // 根据 id 取回待处理的响应消息
                    if (tmp == null) {
                        logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket", channel + "收到未知的响应消息: " + text);
                        logUtil.sendStatusLog(RunStatus.FAILED);
                    } else {
                        if (!tmp.isLose()) {
                            logUtil.sendStepLog(RunStatus.INFO, "", "", channel + "BRTCWebSocket receive: " + text);
                        }
                        // 断言
                        if (tmp.getAssertStr() != null) {
                            try {
                                Map<String, String> exp = new HashMap<>();
                                exp.put("body", tmp.getAssertStr());

                                com.bjy.qa.agent.response.Response response = new com.bjy.qa.agent.response.Response(text, null);
                                response.verify(exp);

                                if (!tmp.isLose()) {
                                    logUtil.sendStepLog(RunStatus.INFO, "", "BRTCWebSocket 断言", channel + "id: " + id + ", 期望：" + tmp.getAssertStr());
                                }
                            } catch (Throwable e) {
                                errorMessages.append("WebSocket 断言异常：\n");
                                errorMessages.append(e.getMessage());
                                errorMessages.append("\n");

                                logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket 断言", channel + "id: " + id + ", 异常信息: \n" + e.getMessage());
                                logUtil.sendStatusLog(RunStatus.FAILED);
                            }
                        }
                        // 保存变量
                        if (tmp.getSetStr() != null) {
                            try {
                                StringBuffer sb = new StringBuffer(); // 记录保存参数信息
                                if (jsonHelper != null) {
                                    JSONObject parasJsonObject = JSONObject.parseObject(tmp.getSetStr()); // 待保存的的参数列表
                                    for (Map.Entry<String, Object> entry : parasJsonObject.entrySet()) {
                                        String key = entry.getKey(); // 得到变量名
                                        Object value = jsonHelper.get(entry.getValue().toString()); // 得到变量内容
                                        if (value == null) {
                                            throw new MyException("保存变量异常，未找到对应 key 的数据。 key：" + entry.getValue().toString());
                                        }
                                        context.addGlobalParas(key, value); // 将需要保存的变量存入 context 中
                                        sb.append("「" + key + ": " + value + "」"); // 添加入 保存参数信息
                                    }
                                }

                                if (!tmp.isLose()) {
                                    logUtil.sendStepLog(RunStatus.INFO, "", "BRTCWebSocket 保存变量", channel + sb.toString());
                                }
                            } catch (Throwable e) {
                                errorMessages.append("WebSocket 保存变量异常：" + tmp.getSetStr() + "\n");
                                errorMessages.append(e.getMessage());
                                errorMessages.append("\n");

                                logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket 保存变量", channel + "id: " + id + ", 期望保存：" + tmp.getSetStr() +"， " + e.getMessage());
                                logUtil.sendStatusLog(RunStatus.FAILED);
                            }
                        }

                        pendingResponseMessages.remove(id); // 最后无论成功或失败，从待处理的响应消息列表中移除
                    }
                } else { // 服务器主动推送过来的消息列表
                    boolean unknownMessage = true; // 未知的推送消息

                    // 遍历待处理的，服务器主动推送过来的消息列表
                    Iterator<PushMessages> it = pendingPushMessages.iterator();
                    while (it.hasNext()) {
                        PushMessages tmp = it.next();
                        // 断言
                        if (tmp.getAssertStr() != null) {
                            try {
                                Map<String, String> exp = new HashMap<>();
                                exp.put("body", tmp.getAssertStr());

                                com.bjy.qa.agent.response.Response response = new com.bjy.qa.agent.response.Response(text, null);
                                response.verify(exp);

                                if (!tmp.isLose()) {
                                    logUtil.sendStepLog(RunStatus.INFO, "", "", channel + "BRTCWebSocket receive: " + text);
                                    logUtil.sendStepLog(RunStatus.INFO, "", "BRTCWebSocket 断言", channel + "seq: " + seq + ", 期望：" + tmp.getAssertStr());
                                }

                                // 断言成功后，保存变量
                                if (tmp.getSetStr() != null) {
                                    try {
                                        StringBuffer sb = new StringBuffer(); // 记录保存参数信息
                                        if (jsonHelper != null) {
                                            JSONObject parasJsonObject = JSONObject.parseObject(tmp.getSetStr()); // 待保存的的参数列表
                                            for (Map.Entry<String, Object> entry : parasJsonObject.entrySet()) {
                                                String key = entry.getKey(); // 得到变量名
                                                Object value = jsonHelper.get(entry.getValue().toString()); // 得到变量内容
                                                if (value == null) {
                                                    throw new MyException("保存变量异常，未找到对应 key 的数据。 key：" + entry.getValue().toString());
                                                }
                                                context.addGlobalParas(key, value); // 将需要保存的变量存入 context 中
                                                sb.append("「" + key + ": " + value + "」"); // 添加入 保存参数信息
                                            }
                                        }

                                        if (!tmp.isLose()) {
                                            logUtil.sendStepLog(RunStatus.INFO, "", "BRTCWebSocket 保存变量", channel + sb.toString());
                                        }
                                    } catch (Throwable e) {
                                        errorMessages.append("WebSocket 保存变量异常：" + tmp.getSetStr() + "\n");
                                        errorMessages.append(e.getMessage());
                                        errorMessages.append("\n");

                                        logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket 保存变量", channel + "seq: " + seq + ", 期望保存：" + tmp.getSetStr() +"， " + e.getMessage());
                                        logUtil.sendStatusLog(RunStatus.FAILED);
                                    }
                                }

                                // 断言成功后，需要回复消息，发送消息
                                if (tmp.getSendText() != null) {
                                    sending(tmp.getSendText(), tmp.isLose(), null);
                                }

                                // 匹配成功推送消息，退出遍历
                                unknownMessage = false;
                                it.remove();
                                break;
                            } catch (Throwable e) {
                                // 断言异常表示，未匹配成功推送消息，继续遍历
                            }
                        }
                    }

                    // 未知消息上报日志
                    if (unknownMessage) {
                        logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket", channel + "收到未知的推送消息: " + text);
                        logUtil.sendStatusLog(RunStatus.FAILED);
                    }
                }
            }

            @Override
            public void onMessage(WebSocketHelper webSocketHelper, ByteString bytes) {
                throw new MyException("BRTCWebSocketService 不支持 ByteString 类型的消息: " + bytes);
            }

            @Override
            public void onClosing(WebSocketHelper webSocketHelper, int code, String reason) {
                opened = false;
            }

            @Override
            public void onClosed(WebSocketHelper webSocketHelper, int code, String reason) {
                opened = false;
            }

            @Override
            public void onFailure(WebSocketHelper webSocketHelper, Throwable throwable, Response response) {
                if (close) {
                    logUtil.sendStepLog(RunStatus.INFO, "", "BRTCWebSocket", channel + "WebSocket connection closed。Throwable : " + throwable + "。 Response: " + response);
                } else {
                    throwable.printStackTrace();
                    logger.error("onFailure。Throwable : " + throwable + "。 Response: " + response);
                    logUtil.sendStepLog(RunStatus.ERROR, "", "BRTCWebSocket", channel + "onFailure。Throwable : " + throwable + "。 Response: " + response);
                    logUtil.sendStatusLog(RunStatus.FAILED);
                }
            }
        });
    }

    /**
     * 发送消息
     * @param text 消息内容
     * @param lose 是否丢弃
     * @param responseMessages 待处理的响应信息。当收到服务器 push 消息后需要回复时，这个参数可能为 null，也就是不要添加到待处理的响应消息列表中，也就是服务器不会再回复消息
     */
    private void sending(String text, boolean lose, ResponseMessages responseMessages) {
        JSONObject sendJson = JSONObject.parseObject(text);
        if (sendJson.get("id") == null) { // 待发送消息中没有 id，补自增 id，有保持原有 id
            sendJson.put("id", this.autoId.addAndGet(1));
        }
        String sendText = sendJson.toJSONString();

        long timeOut = System.currentTimeMillis() + webSocketHelper.getTimeout() * 1000;
        while (true) {
            if (this.opened) {
                if (webSocketHelper.send(sendText)) {
                    // 如果 待处理的响应信息不为空，添加到待处理的响应消息列表中
                    if (responseMessages != null) {
                        responseMessages.setId(this.autoId.get());
                        pendingResponseMessages.put(this.autoId.get(), (ResponseMessages) responseMessages.clone());
                    }
                    if (!lose) {
                        logUtil.sendStepLog(RunStatus.INFO, "", "", channel + "BRTCWebSocket send: " + sendText);
                    }
                    return;
                } else {
                    throw new MyException("Websocket send failed, text: " + sendText);
                }
            }

            if (System.currentTimeMillis() >= timeOut) {
                throw new MyException("Websocket send timeout, text: " + sendText);
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
    }

    /**
     * 发送线程（需要定时发送时需要）
     */
    class SendThread extends Thread {
        long interval = 0; // 间隔发送时间
        String sendMsg; // 发送消息
        boolean lose; // 是否丢弃
        ResponseMessages responseMessages; // 待处理的响应信息

        /**
         * 构造函数
         * @param interval 间隔发送时间
         * @param sendMsg 发送消息
         * @param responseMessages 待处理的响应信息
         */
        public SendThread(long interval, String sendMsg, boolean lose, ResponseMessages responseMessages) {
            this.interval = interval;
            this.sendMsg = sendMsg;
            this.lose = lose;
            this.responseMessages = responseMessages;
        }

        @Override
        public void run() {
            while (true) {
                sending(sendMsg, lose, this.responseMessages);

                try {
                    sleep(interval);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /**
     * 待处理的，请求后服务器响应的消息
     */
    class ResponseMessages implements Cloneable {
        private Long id; // 请求 id（发送请求消息的唯一标识）
        private String assertStr; // 断言 json 字符串
        private String setStr; // 保存变量 json 字符串
        private boolean lose; // 收到响应后是否丢弃

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getAssertStr() {
            return assertStr;
        }

        public void setAssertStr(String assertStr) {
            this.assertStr = assertStr;
        }

        public String getSetStr() {
            return setStr;
        }

        public void setSetStr(String setStr) {
            this.setStr = setStr;
        }

        public boolean isLose() {
            return lose;
        }

        public void setLose(boolean lose) {
            this.lose = lose;
        }

        @Override
        public String toString() {
            return "ResponseMessages{" +
                    "id=" + id +
                    ", assertStr='" + assertStr + '\'' +
                    ", setStr='" + setStr + '\'' +
                    ", lose=" + lose +
                    '}';
        }

        public ResponseMessages clone() {
            try {
                return (ResponseMessages) super.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    /**
     * 待处理的，服务器主动推送过来的消息
     */
    class PushMessages implements Cloneable {
        private String assertStr; // 断言 json 字符串
        private String setStr; // 保存变量 json 字符串
        private boolean lose; // 收到响应后是否丢弃
        private String sendText; // 待发送的消息内容（当收到服务器 push 消息后，需要响应，所发送的内容）

        public String getAssertStr() {
            return assertStr;
        }

        public void setAssertStr(String assertStr) {
            this.assertStr = assertStr;
        }

        public String getSetStr() {
            return setStr;
        }

        public void setSetStr(String setStr) {
            this.setStr = setStr;
        }

        public boolean isLose() {
            return lose;
        }

        public void setLose(boolean lose) {
            this.lose = lose;
        }

        public String getSendText() {
            return sendText;
        }

        public void setSendText(String sendText) {
            this.sendText = sendText;
        }

        @Override
        public String toString() {
            return "PushMessages{" +
                    "assertStr='" + assertStr + '\'' +
                    ", setStr='" + setStr + '\'' +
                    ", lose=" + lose +
                    ", sendText='" + sendText + '\'' +
                    '}';
        }

        public PushMessages clone() {
            try {
                return (PushMessages) super.clone();
            } catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
}